package kafka.springboot ;

import java.util.HashMap ;
import java.util.Map ;

import org.apache.kafka.clients.producer.ProducerConfig ;
import org.apache.kafka.common.serialization.StringSerializer ;
import org.springframework.context.annotation.Bean ;
import org.springframework.context.annotation.Configuration ;
import org.springframework.kafka.annotation.EnableKafka ;
import org.springframework.kafka.core.DefaultKafkaProducerFactory ;
import org.springframework.kafka.core.KafkaTemplate ;
import org.springframework.kafka.core.ProducerFactory ;

/**
 * kafka与springboot集成 kafka生产者配置类
 * @author 80002165 @date 2017年6月23日 下午5:25:32
 */
@Configuration
@EnableKafka
public class KafkaProduceConfig {
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<String, Object>() ;
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092") ;
        props.put(ProducerConfig.RETRIES_CONFIG, 0) ;
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096) ;
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1) ;
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960) ;
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) ;
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) ;
        return props ;
    }
    
    public ProducerFactory<String, String> producerFactory(){
        return new DefaultKafkaProducerFactory<String, String>(producerConfigs()) ;
    }
    
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<String, String>(producerFactory()) ;
    }
    
    public static void main(String[] args) {
        //注入拿到kafkaTemplate发送消息
//        kafkaTemplate.send("test-topic", "hello");
//        kafkaTemplate.send("test-topic", "key-1", "hello");
    }
}
